package com.colin.distributekafka.admin;

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.config.ConfigResource;

import java.util.*;
import java.util.concurrent.ExecutionException;

/**
 * @author zhaodong 2021/5/30 16:4233
 * email: colinzhaodong@gmail.com
 * desc:
 */
public class AdminSample {
    public static final String TOPIC_NAME = "colin-topic";

    public static void main(String[] args) throws Exception {
//        AdminClient client = init();
//        System.out.println(client);
//        createTopic();
//        delTopics();
//        topicList();
        describeTopics();
//        describeConfig();
//        alterConfig();
//        increPartition(3);
    }

    public static void increPartition(int num) throws ExecutionException, InterruptedException {
        AdminClient client = init();
        NewPartitions newPartitions = NewPartitions.increaseTo(num);
        CreatePartitionsResult partitions = client.createPartitions(Collections.singletonMap(TOPIC_NAME, newPartitions));
        partitions.all().get();
    }

    /*
        修改Config信息
     */
    public static void alterConfig() throws Exception{
        AdminClient adminClient = init();
        Map<ConfigResource,Config> configMaps = new HashMap<>();

        // 组织两个参数
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
        Config config = new Config(Collections.singletonList(new ConfigEntry("preallocate", "true")));
        configMaps.put(configResource,config);
        AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(configMaps);

        /*
            从 2.3以上的版本新修改的API
         */
//        Map<ConfigResource,Collection<AlterConfigOp>> configMaps = new HashMap<>();
//        // 组织两个参数
//        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
//        AlterConfigOp alterConfigOp =
//                new AlterConfigOp(new ConfigEntry("preallocate","false"),AlterConfigOp.OpType.SET);
//        configMaps.put(configResource, Collections.singletonList(alterConfigOp));
//
//        AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(configMaps);
//        alterConfigsResult.all().get();
    }

    /*
        查看配置信息
        ConfigResource(type=TOPIC, name='zhaodong-topic') ,
        Config(
            entries=[
             ConfigEntry(
               name=compression.type,
               value=producer,
               source=DEFAULT_CONFIG,
               isSensitive=false,
               isReadOnly=false,
               synonyms=[]),
             ConfigEntry(
                name=leader.replication.throttled.replicas,
                value=,
                source=DEFAULT_CONFIG,
                isSensitive=false,
                isReadOnly=false,
                synonyms=[]), ConfigEntry(name=message.downconversion.enable, value=true, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=min.insync.replicas, value=1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.jitter.ms, value=0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=cleanup.policy, value=delete, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=flush.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=follower.replication.throttled.replicas, value=, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.bytes, value=1073741824, source=STATIC_BROKER_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=retention.ms, value=604800000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=flush.messages, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.format.version, value=2.4-IV1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=file.delete.delay.ms, value=60000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=max.compaction.lag.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=max.message.bytes, value=1000012, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=min.compaction.lag.ms, value=0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.timestamp.type, value=CreateTime, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
             ConfigEntry(name=preallocate, value=false, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=min.cleanable.dirty.ratio, value=0.5, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=index.interval.bytes, value=4096, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=unclean.leader.election.enable, value=false, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=retention.bytes, value=-1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=delete.retention.ms, value=86400000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.ms, value=604800000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.timestamp.difference.max.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.index.bytes, value=10485760, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])])
     */
    public static void describeConfig() throws Exception{
        AdminClient adminClient = init();
        // TODO 这里做一个预留，集群时会讲到
//        ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, TOPIC_NAME);

        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
        DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Collections.singletonList(configResource));
        Map<ConfigResource, Config> configResourceConfigMap = describeConfigsResult.all().get();
        configResourceConfigMap.forEach((key, value) -> System.out.println("configResource : " + key + " , Config : " + value));
    }

    /**
     * 创建topic实例
     */
    public static void createTopic(){
        AdminClient adminClient = init();
        short replicaNum = 1;
        NewTopic newTopic = new NewTopic("colin-topic", 1, replicaNum);
        CreateTopicsResult topicsResult = adminClient.createTopics(Collections.singletonList(newTopic));
    }

    public static void delTopics() throws Exception {
        AdminClient adminClient = init();
        DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singletonList(TOPIC_NAME));
        deleteTopicsResult.all().get();
    }

    public static void topicList() throws ExecutionException, InterruptedException {
        AdminClient adminClient = init();
        // 是否查看internal选项
        ListTopicsOptions options = new ListTopicsOptions();
        options.listInternal(true);
        ListTopicsResult listTopicsResult = adminClient.listTopics();
        Set<String> nameSet = listTopicsResult.names().get();
        nameSet.forEach(System.out::println);

    }

    public static void describeTopics() throws Exception {
        AdminClient adminClient = init();
        DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(TOPIC_NAME));
        Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get();
        Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet();
        entries.forEach((entry)->{
            System.out.println("name ："+entry.getKey()+" , desc: "+ entry.getValue());
        });
    }

    private enum initKafkaAdmin{
        /**
         * kafka客户端实例
         */
        INSTANCE;
        private final AdminClient client;
        initKafkaAdmin() {
            Properties props = new Properties();
            props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"47.111.16.185:9093");
            client = AdminClient.create(props);
        }

        private AdminClient init(){
            return client;
        }
    }

    public static AdminClient init(){
        return initKafkaAdmin.INSTANCE.init();
    }
}
